{
 "cells": [
  {
   "cell_type": "markdown",
   "source": [
    "# DataFrame transform example with the Spark SQL ReAct Agent\n",
    "\n",
    "The example is modified from the blog post [Introducing Window Functions in Spark SQL](https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html)."
   ],
   "metadata": {
    "collapsed": false
   }
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Setting default log level to \"WARN\".\n",
      "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
      "23/08/15 11:28:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n"
     ]
    }
   ],
   "source": [
    "from pyspark_ai import SparkAI\n",
    "\n",
    "spark_ai = SparkAI(verbose=True)\n",
    "spark_ai.activate()"
   ],
   "metadata": {
    "collapsed": false
   }
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "outputs": [],
   "source": [
    "# create a dataframe productRevenue\n",
    "df = spark_ai._spark.createDataFrame(\n",
    "    [\n",
    "        (\"Normal\", \"Cellphone\", 6000),\n",
    "        (\"Normal\", \"Tablet\", 1500),\n",
    "        (\"Mini\", \"Tablet\", 5500),\n",
    "        (\"Mini\", \"Cellphone\", 5000),\n",
    "        (\"Foldable\", \"Cellphone\", 6500),\n",
    "        (\"Foldable\", \"Tablet\", 2500),\n",
    "        (\"Pro\", \"Cellphone\", 3000),\n",
    "        (\"Pro\", \"Tablet\", 4000),\n",
    "        (\"Pro Max\", \"Cellphone\", 4500)\n",
    "    ],\n",
    "    [\"product\", \"category\", \"revenue\"]\n",
    ")"
   ],
   "metadata": {
    "collapsed": false
   }
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001B[92mINFO: \u001B[0mCreating temp view for the transform:\n",
      "df.createOrReplaceTempView(\u001B[33m\"\u001B[39;49;00m\u001B[33mspark_ai_temp_view_d566e4\u001B[39;49;00m\u001B[33m\"\u001B[39;49;00m)\u001B[37m\u001B[39;49;00m\n",
      "\n",
      "\n",
      "\n",
      "\u001B[1m> Entering new  chain...\u001B[0m\n",
      "\u001B[32;1m\u001B[1;3mThought: I will use the ROW_NUMBER() function to rank the products in each category by revenue in descending order. Then I will filter the results to only include the products with a rank of 1 or 2.\n",
      "Action: query_validation\n",
      "Action Input: \n",
      "```\n",
      "WITH ranked_products AS (\n",
      "  SELECT \n",
      "    product, \n",
      "    category, \n",
      "    revenue, \n",
      "    ROW_NUMBER() OVER (PARTITION BY category ORDER BY revenue DESC) as rank\n",
      "  FROM spark_ai_temp_view_d566e4\n",
      ")\n",
      "SELECT product, category, revenue\n",
      "FROM ranked_products\n",
      "WHERE rank IN (1, 2)\n",
      "```\u001B[0m\n",
      "Observation: \u001B[33;1m\u001B[1;3mOK\u001B[0m\n",
      "Thought:\u001B[32;1m\u001B[1;3mI now know the final answer.\n",
      "Final Answer: \n",
      "```\n",
      "WITH ranked_products AS (\n",
      "  SELECT \n",
      "    product, \n",
      "    category, \n",
      "    revenue, \n",
      "    ROW_NUMBER() OVER (PARTITION BY category ORDER BY revenue DESC) as rank\n",
      "  FROM spark_ai_temp_view_d566e4\n",
      ")\n",
      "SELECT product, category, revenue\n",
      "FROM ranked_products\n",
      "WHERE rank IN (1, 2)\n",
      "```\u001B[0m\n",
      "\n",
      "\u001B[1m> Finished chain.\u001B[0m\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Stage 0:>                                                        (0 + 10) / 10]\r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+---------+-------+\n",
      "| product| category|revenue|\n",
      "+--------+---------+-------+\n",
      "|Foldable|Cellphone|   6500|\n",
      "|  Normal|Cellphone|   6000|\n",
      "|    Mini|   Tablet|   5500|\n",
      "|     Pro|   Tablet|   4000|\n",
      "+--------+---------+-------+\n",
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "df.ai.transform(\"What are the best-selling and the second best-selling products in every category?\").show()"
   ],
   "metadata": {
    "collapsed": false
   }
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001B[92mINFO: \u001B[0mCreating temp view for the transform:\n",
      "df.createOrReplaceTempView(\u001B[33m\"\u001B[39;49;00m\u001B[33mspark_ai_temp_view_8f41a9\u001B[39;49;00m\u001B[33m\"\u001B[39;49;00m)\u001B[37m\u001B[39;49;00m\n",
      "\n",
      "\n",
      "\n",
      "\u001B[1m> Entering new  chain...\u001B[0m\n",
      "\u001B[32;1m\u001B[1;3mThought: I need to find the revenue of the best-selling product in each category first. Then, I will join this result with the original table and calculate the difference in revenue.\n",
      "Action: query_sql_db\n",
      "Action Input: \"SELECT category, MAX(revenue) as max_revenue FROM spark_ai_temp_view_8f41a9 GROUP BY category\"\u001B[0m"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n",
      "Observation: \u001B[36;1m\u001B[1;3m[('Cellphone', '6500'), ('Tablet', '5500')]\u001B[0m\n",
      "Thought:\u001B[32;1m\u001B[1;3mI can join this result with the original table and calculate the difference in revenue.\n",
      "Action: query_validation\n",
      "Action Input: \n",
      "```\n",
      "SELECT \n",
      "    a.product, \n",
      "    a.category, \n",
      "    a.revenue, \n",
      "    b.max_revenue, \n",
      "    (b.max_revenue - a.revenue) as revenue_difference \n",
      "FROM \n",
      "    spark_ai_temp_view_8f41a9 a \n",
      "JOIN \n",
      "    (SELECT category, MAX(revenue) as max_revenue FROM spark_ai_temp_view_8f41a9 GROUP BY category) b \n",
      "ON \n",
      "    a.category = b.category\n",
      "```\u001B[0m\n",
      "Observation: \u001B[33;1m\u001B[1;3mOK\u001B[0m\n",
      "Thought:\u001B[32;1m\u001B[1;3mI now know the final answer.\n",
      "Final Answer: \n",
      "```\n",
      "SELECT \n",
      "    a.product, \n",
      "    a.category, \n",
      "    a.revenue, \n",
      "    b.max_revenue, \n",
      "    (b.max_revenue - a.revenue) as revenue_difference \n",
      "FROM \n",
      "    spark_ai_temp_view_8f41a9 a \n",
      "JOIN \n",
      "    (SELECT category, MAX(revenue) as max_revenue FROM spark_ai_temp_view_8f41a9 GROUP BY category) b \n",
      "ON \n",
      "    a.category = b.category\n",
      "```\u001B[0m\n",
      "\n",
      "\u001B[1m> Finished chain.\u001B[0m\n",
      "+--------+---------+-------+-----------+------------------+\n",
      "| product| category|revenue|max_revenue|revenue_difference|\n",
      "+--------+---------+-------+-----------+------------------+\n",
      "|  Normal|Cellphone|   6000|       6500|               500|\n",
      "|  Normal|   Tablet|   1500|       5500|              4000|\n",
      "|    Mini|   Tablet|   5500|       5500|                 0|\n",
      "|    Mini|Cellphone|   5000|       6500|              1500|\n",
      "|Foldable|Cellphone|   6500|       6500|                 0|\n",
      "|Foldable|   Tablet|   2500|       5500|              3000|\n",
      "|     Pro|Cellphone|   3000|       6500|              3500|\n",
      "|     Pro|   Tablet|   4000|       5500|              1500|\n",
      "| Pro Max|Cellphone|   4500|       6500|              2000|\n",
      "+--------+---------+-------+-----------+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.ai.transform(\"What is the difference between the revenue of each product and the revenue of the best-selling product in the same category of that product?\").show()"
   ],
   "metadata": {
    "collapsed": false
   }
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001B[92mINFO: \u001B[0mCreating temp view for the transform:\n",
      "df.createOrReplaceTempView(\u001B[33m\"\u001B[39;49;00m\u001B[33mspark_ai_temp_view_096b0f\u001B[39;49;00m\u001B[33m\"\u001B[39;49;00m)\u001B[37m\u001B[39;49;00m\n",
      "\n",
      "\n",
      "\n",
      "\u001B[1m> Entering new  chain...\u001B[0m\n",
      "\u001B[32;1m\u001B[1;3mThought: Spark SQL does not support dynamic pivot operations, which are required to transpose the table as requested. I should get all the distinct values of column product.\n",
      "Action: query_sql_db\n",
      "Action Input: \"SELECT DISTINCT product FROM spark_ai_temp_view_096b0f\"\u001B[0m\n",
      "Observation: \u001B[36;1m\u001B[1;3m[('Normal',), ('Mini',), ('Foldable',), ('Pro',), ('Pro Max',)]\u001B[0m\n",
      "Thought:\u001B[32;1m\u001B[1;3mI can write a query to pivot the table by product and sum the revenue for each product.\n",
      "Action: query_validation\n",
      "Action Input: SELECT * FROM spark_ai_temp_view_096b0f PIVOT (SUM(revenue) FOR product IN ('Normal', 'Mini', 'Foldable', 'Pro', 'Pro Max'))\u001B[0m\n",
      "Observation: \u001B[33;1m\u001B[1;3mOK\u001B[0m\n",
      "Thought:\u001B[32;1m\u001B[1;3mI now know the final answer.\n",
      "Final Answer: SELECT * FROM spark_ai_temp_view_096b0f PIVOT (SUM(revenue) FOR product IN ('Normal', 'Mini', 'Foldable', 'Pro', 'Pro Max'))\u001B[0m\n",
      "\n",
      "\u001B[1m> Finished chain.\u001B[0m\n",
      "+---------+------+----+--------+----+-------+\n",
      "| category|Normal|Mini|Foldable| Pro|Pro Max|\n",
      "+---------+------+----+--------+----+-------+\n",
      "|Cellphone|  6000|5000|    6500|3000|   4500|\n",
      "|   Tablet|  1500|5500|    2500|4000|   null|\n",
      "+---------+------+----+--------+----+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.ai.transform(\"Pivot the data by product and the revenue for each product\").show()"
   ],
   "metadata": {
    "collapsed": false
   }
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001B[92mINFO: \u001B[0mCreating temp view for the transform:\n",
      "df.createOrReplaceTempView(\u001B[33m\"\u001B[39;49;00m\u001B[33mspark_ai_temp_view_48845a\u001B[39;49;00m\u001B[33m\"\u001B[39;49;00m)\u001B[37m\u001B[39;49;00m\n",
      "\n",
      "\n",
      "\n",
      "\u001B[1m> Entering new  chain...\u001B[0m\n",
      "\u001B[32;1m\u001B[1;3mThought: Spark SQL does not support dynamic pivot operations, which are required to transpose the table as requested. I should get all the distinct values of column category.\n",
      "Action: query_sql_db\n",
      "Action Input: \"SELECT DISTINCT category FROM spark_ai_temp_view_48845a\"\u001B[0m\n",
      "Observation: \u001B[36;1m\u001B[1;3m[('Cellphone',), ('Tablet',)]\u001B[0m\n",
      "Thought:\u001B[32;1m\u001B[1;3mI can write a query to pivot the table by category and sum the revenue for each product and category combination.\n",
      "Action: query_validation\n",
      "Action Input: SELECT * FROM spark_ai_temp_view_48845a PIVOT (SUM(revenue) FOR category IN ('Cellphone', 'Tablet'))\u001B[0m\n",
      "Observation: \u001B[33;1m\u001B[1;3mOK\u001B[0m\n",
      "Thought:\u001B[32;1m\u001B[1;3mI now know the final answer.\n",
      "Final Answer: SELECT * FROM spark_ai_temp_view_48845a PIVOT (SUM(revenue) FOR category IN ('Cellphone', 'Tablet'))\u001B[0m\n",
      "\n",
      "\u001B[1m> Finished chain.\u001B[0m\n",
      "+--------+---------+------+\n",
      "| product|Cellphone|Tablet|\n",
      "+--------+---------+------+\n",
      "|    Mini|     5000|  5500|\n",
      "|     Pro|     3000|  4000|\n",
      "| Pro Max|     4500|  null|\n",
      "|Foldable|     6500|  2500|\n",
      "|  Normal|     6000|  1500|\n",
      "+--------+---------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.ai.transform(\"Pivot the data by catagory and the revenue for each product\").show()"
   ],
   "metadata": {
    "collapsed": false
   }
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "outputs": [],
   "source": [],
   "metadata": {
    "collapsed": false
   }
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
